#!/usr/bin/env python
import pika
import time
from jqdatasdk import *
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

exchangeName = 'exchange.normal'
exchangeDlxName = 'exchange.dlx'
queueName = 'queue.normal'
queueDlxName = 'queue.dlx'
routing_key = 'routingkey'

channel.exchange_declare(exchange=exchangeDlxName, exchange_type='direct', passive=False, durable=True)

channel.queue_declare(queue=queueDlxName, passive=False, durable=True, exclusive=False, auto_delete=False)
# channel.queue_declare(queue='task_queue1', arguments={'x-message-ttl': 6000})
channel.queue_bind(queue=queueDlxName, exchange=exchangeDlxName, routing_key=routing_key)
print(' [*] Waiting for messages. To exit press CTRL+C')

auth('18928692919', 'Qq123456')

def callback(ch, method, properties, body):
    print(body.decode())
    # print(" [x] Received %r" % body.decode())
    # print(properties)
    # time.sleep(body.count(b'.'))

    # a = time.time()
    # q = query(valuation.turnover_ratio,
    #           valuation.market_cap,
    #           indicator.eps
    #           ).filter(valuation.code.in_([body.decode()]))
    #
    # panel = get_fundamentals_continuously(q, end_date='2021-02-02', count=4000)
    # print(panel)
    # b = time.time()
    # print(b-a)

    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=queueDlxName, on_message_callback=callback)

channel.start_consuming()
# logout()
# channel.close()
connection.close()


